package org.niit.handler

import com.google.gson.Gson
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.niit.bean.{AdClickData, Answer}
import org.niit.util.{MyKafkaUtil, SparkUtil}

object DataHandler {

  val ssc = SparkUtil.takeSSC()
  def kafkaAdDataHandler(groupId:String,topic:String):DStream[AdClickData]={

    //从Kafka中获得广告数据
    val kfDataDS: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(groupId,topic, ssc)
    val adClickData: DStream[AdClickData] = kfDataDS.map(kafkaData => {
      val data = kafkaData.value()
      val datas = data.split(" ")
      //           时间戳     区域        城市      用户id    广告id
      AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
    })
    adClickData
  }

  def kafkaAnswerDataHandler(groupId:String,topic:String):DStream[Answer]={


    val kfDataDS: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(groupId,topic, ssc)
    val answerData: DStream[Answer] = kfDataDS.map(kafkaData => {
      val data = kafkaData.value()
      /*
      JSON --> JavaScript
      Java 对象
      JS(前端)   对象 JSON    ---->  后端
      前端（JSON）    --->(JSON)    后端  JSON格式的字符串
      利用GSON或者fastJSON 将JSON字符串转换成 Java对象
      对象.getXxxx

       */
      val gson = new Gson();
      val answer: Answer = gson.fromJson(data, classOf[Answer])
      answer
    })
    answerData
  }


  def startAndAwait(): Unit ={
    ssc.start()
    ssc.awaitTermination()
  }

}
